# Databricks notebook source
df = spark.read.json("/mnt/storage/jobs/51jobs.json")

# COMMAND ----------

df.printSchema()

# COMMAND ----------

import pyspark.sql.functions as F
from pyspark.sql.functions import split, round
from pyspark.sql.types import DoubleType

def year_salary(content):
    def calc(s):
        if s.find("万") != -1:
            return float(s.replace("万",""))
        if s.find("千") != -1:
            return float(s.replace("千","")) / 10
        if s.find("元") != -1:
            return float(s.replace("元","")) / 10000
        if s == "":
            return 0.0
        return float(s)
        
        
    m = 12.0
    meand = 0.0
    
    content = content.replace("及","").replace("以下","")
    
    if content.find("年") != -1:
        m = 1
        content = content.split("/")[0]
    if content.find("天") != -1:
        m = 365
        content = content.split("/")[0]
    if content.find("时") != -1:
        m = 365 * 6
        content = content.split("/")[0]
    if content.find("薪") != -1:
        m = float(content.split("·")[1].replace("薪",""))
        content = content.split("·")[0]
    if content.find("-") == -1:
        meand = calc(content)
    else:
        meand = (calc(content.split("-")[0]) + calc(content.split("-")[1])) / 2
    return meand * m

year_salary=F.udf(year_salary,DoubleType())

df.select("companyName","companySizeString","companyTypeString","degreeString","industryType1Str","industryType2Str","jobAreaCode","jobAreaString",split("jobAreaString","·")[0].alias("city"),split("jobAreaString","·")[1].alias("area"),"jobName","jobTags","lat","lon","provideSalaryString",round(year_salary("provideSalaryString")).alias("yearSalary"),"updateDateTime","workYearString","jobHref").repartition(1).write.mode("overwrite").json("/mnt/storage/jobs/jobs")

# COMMAND ----------

# MAGIC %sh
# MAGIC 
# MAGIC cp /dbfs/mnt/storage/jobs/jobs/*.json ../Datasets/51jobs.json
